OpenManus 实现细节
深入源码,理解关键功能的实现原理
本章概览
- LLM 调用封装:Token 管理与重试机制
- 浏览器自动化实现:browser-use 集成
- MCP 协议实现:连接外部服务
- 代码执行沙箱:安全隔离执行
- PlanningFlow:任务规划与执行
1. LLM 调用封装
1.1 单例模式设计
OpenManus 使用单例模式管理 LLM 实例,相同配置复用同一实例:
python
# app/llm.py
class LLM:
_instances: Dict[str, "LLM"] = {} # 实例缓存
def __new__(cls, config_name: str = "default", llm_config: Optional[LLMSettings] = None):
# 检查缓存
if config_name not in cls._instances:
instance = super().__new__(cls)
instance.__init__(config_name, llm_config)
cls._instances[config_name] = instance
return cls._instances[config_name]
def __init__(self, config_name: str = "default", llm_config: Optional[LLMSettings] = None):
# 避免重复初始化
if hasattr(self, "client"):
return
# 加载配置
llm_config = llm_config or config.llm
llm_config = llm_config.get(config_name, llm_config["default"])
self.model = llm_config.model
self.max_tokens = llm_config.max_tokens
self.temperature = llm_config.temperature
# 创建对应的客户端
if self.api_type == "azure":
self.client = AsyncAzureOpenAI(
base_url=self.base_url,
api_key=self.api_key,
api_version=self.api_version,
)
elif self.api_type == "aws":
self.client = BedrockClient()
else:
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url
)为什么使用单例?
没有单例 使用单例
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ │ │ │
│ Agent1 ──▶ LLM("default") │ │ Agent1 ──┐ │
│ ↓ │ │ │ │
│ 新建 OpenAI 客户端 │ │ ▼ │
│ │ │ LLM("default") ◀── 共享 │
│ Agent2 ──▶ LLM("default") │ │ ▲ │
│ ↓ │ │ │ │
│ 又新建 OpenAI 客户端 │ │ Agent2 ──┘ │
│ │ │ │
│ ❌ 资源浪费,连接开销大 │ │ ✅ 复用连接,减少开销 │
└─────────────────────────────────┘ └─────────────────────────────────┘1.2 Token 计数器
精确计算 Token 用量对于控制成本和避免超限至关重要:
python
# app/llm.py
class TokenCounter:
"""Token 计数器"""
# Token 常量
BASE_MESSAGE_TOKENS = 4 # 每条消息的基础 token
FORMAT_TOKENS = 2 # 格式化 token
LOW_DETAIL_IMAGE_TOKENS = 85 # 低分辨率图片 token
HIGH_DETAIL_TILE_TOKENS = 170 # 高分辨率图片每块 token
def __init__(self, tokenizer):
self.tokenizer = tokenizer # tiktoken 编码器
def count_text(self, text: str) -> int:
"""计算文本的 token 数"""
if not text:
return 0
return len(self.tokenizer.encode(text))
def count_image(self, image_item: dict) -> int:
"""计算图片的 token 数(基于 OpenAI 规则)"""
detail = image_item.get("detail", "medium")
if detail == "low":
return self.LOW_DETAIL_IMAGE_TOKENS # 固定 85 token
# 高分辨率图片计算
if "dimensions" in image_item:
width, height = image_item["dimensions"]
return self._calculate_high_detail_tokens(width, height)
# 默认估算
return 1024 if detail != "high" else self._calculate_high_detail_tokens(1024, 1024)
def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
"""计算高分辨率图片的 token 数"""
MAX_SIZE = 2048
TARGET_SHORT_SIDE = 768
TILE_SIZE = 512
# Step 1: 缩放到 2048x2048 内
if width > MAX_SIZE or height > MAX_SIZE:
scale = MAX_SIZE / max(width, height)
width = int(width * scale)
height = int(height * scale)
# Step 2: 短边缩放到 768px
scale = TARGET_SHORT_SIDE / min(width, height)
scaled_width = int(width * scale)
scaled_height = int(height * scale)
# Step 3: 计算 512x512 tile 数量
tiles_x = math.ceil(scaled_width / TILE_SIZE)
tiles_y = math.ceil(scaled_height / TILE_SIZE)
total_tiles = tiles_x * tiles_y
# Step 4: 计算总 token
return total_tiles * self.HIGH_DETAIL_TILE_TOKENS + self.LOW_DETAIL_IMAGE_TOKENS
def count_message_tokens(self, messages: List[dict]) -> int:
"""计算消息列表的总 token 数"""
total = self.FORMAT_TOKENS
for message in messages:
tokens = self.BASE_MESSAGE_TOKENS
tokens += self.count_text(message.get("role", ""))
tokens += self.count_content(message.get("content"))
tokens += self.count_tool_calls(message.get("tool_calls", []))
tokens += self.count_text(message.get("name", ""))
tokens += self.count_text(message.get("tool_call_id", ""))
total += tokens
return total1.3 重试机制
使用 tenacity 库实现指数退避重试:
python
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
@retry(
wait=wait_random_exponential(min=1, max=60), # 随机指数退避:1-60秒
stop=stop_after_attempt(6), # 最多重试 6 次
retry=retry_if_exception_type((OpenAIError, Exception, ValueError)),
)
async def ask_tool(
self,
messages: List[Union[dict, Message]],
tools: Optional[List[dict]] = None,
tool_choice: TOOL_CHOICE_TYPE = ToolChoice.AUTO,
**kwargs,
) -> ChatCompletionMessage:
"""工具调用请求(带重试)"""
try:
# 检查 token 限制
input_tokens = self.count_message_tokens(messages)
if not self.check_token_limit(input_tokens):
raise TokenLimitExceeded(self.get_limit_error_message(input_tokens))
# 发送请求
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
max_tokens=self.max_tokens,
temperature=self.temperature,
stream=False, # 工具调用不使用流式
)
# 更新 token 计数
self.update_token_count(
response.usage.prompt_tokens,
response.usage.completion_tokens
)
return response.choices[0].message
except TokenLimitExceeded:
raise # 不重试 token 超限错误
except OpenAIError as e:
logger.error(f"OpenAI API error: {e}")
raise重试策略可视化:
第1次失败 ──▶ 等待 1-2秒 ──▶ 第2次尝试
第2次失败 ──▶ 等待 2-4秒 ──▶ 第3次尝试
第3次失败 ──▶ 等待 4-8秒 ──▶ 第4次尝试
...
第6次失败 ──▶ 抛出异常2. 浏览器自动化实现
2.1 browser-use 集成
OpenManus 基于 browser-use 库实现浏览器自动化:
python
# app/tool/browser_use_tool.py
from browser_use import Browser as BrowserUseBrowser
from browser_use import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig
class BrowserUseTool(BaseTool):
name: str = "browser_use"
description: str = "强大的浏览器自动化工具..."
# 懒加载的浏览器实例
browser: Optional[BrowserUseBrowser] = None
context: Optional[BrowserContext] = None
dom_service: Optional[DomService] = None
# 并发控制锁
lock: asyncio.Lock = Field(default_factory=asyncio.Lock)2.2 浏览器初始化
python
async def _ensure_browser_initialized(self) -> BrowserContext:
"""确保浏览器已初始化(懒加载)"""
if self.browser is None:
# 配置浏览器选项
browser_config_kwargs = {
"headless": False, # 非无头模式(可见)
"disable_security": True # 禁用安全限制
}
# 从配置加载代理设置
if config.browser_config and config.browser_config.proxy:
browser_config_kwargs["proxy"] = ProxySettings(
server=config.browser_config.proxy.server,
username=config.browser_config.proxy.username,
password=config.browser_config.proxy.password,
)
# 创建浏览器实例
self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs))
if self.context is None:
# 创建浏览器上下文
self.context = await self.browser.new_context(BrowserContextConfig())
# 初始化 DOM 服务
self.dom_service = DomService(await self.context.get_current_page())
return self.context2.3 操作执行实现
python
async def execute(
self,
action: str,
url: Optional[str] = None,
index: Optional[int] = None,
text: Optional[str] = None,
**kwargs,
) -> ToolResult:
"""执行浏览器操作"""
async with self.lock: # 确保串行执行
try:
context = await self._ensure_browser_initialized()
# 导航操作
if action == "go_to_url":
if not url:
return ToolResult(error="URL is required")
page = await context.get_current_page()
await page.goto(url)
await page.wait_for_load_state()
return ToolResult(output=f"Navigated to {url}")
# 点击操作
elif action == "click_element":
if index is None:
return ToolResult(error="Index is required")
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element {index} not found")
await context._click_element_node(element)
return ToolResult(output=f"Clicked element at index {index}")
# 输入操作
elif action == "input_text":
if index is None or not text:
return ToolResult(error="Index and text required")
element = await context.get_dom_element_by_index(index)
await context._input_text_element_node(element, text)
return ToolResult(output=f"Input '{text}' into element {index}")
# 滚动操作
elif action in ("scroll_down", "scroll_up"):
direction = 1 if action == "scroll_down" else -1
amount = kwargs.get("scroll_amount", 500)
await context.execute_javascript(
f"window.scrollBy(0, {direction * amount});"
)
return ToolResult(output=f"Scrolled {action.split('_')[1]}")
# 内容提取
elif action == "extract_content":
goal = kwargs.get("goal")
if not goal:
return ToolResult(error="Goal is required")
page = await context.get_current_page()
import markdownify
content = markdownify.markdownify(await page.content())
# 使用 LLM 提取内容
prompt = f"""
提取目标: {goal}
页面内容:
{content[:2000]}
"""
response = await self.llm.ask_tool(
messages=[{"role": "system", "content": prompt}],
tools=[extraction_function],
tool_choice="required",
)
# ... 解析响应 ...
# ... 更多操作 ...
except Exception as e:
return ToolResult(error=f"Browser action '{action}' failed: {str(e)}")2.4 获取浏览器状态
python
async def get_current_state(self) -> ToolResult:
"""获取当前浏览器状态(包含截图)"""
try:
ctx = self.context
if not ctx:
return ToolResult(error="Browser not initialized")
state = await ctx.get_state()
# 截图
page = await ctx.get_current_page()
await page.bring_to_front()
await page.wait_for_load_state()
screenshot = await page.screenshot(
full_page=True,
animations="disabled",
type="jpeg",
quality=100
)
screenshot_b64 = base64.b64encode(screenshot).decode("utf-8")
# 构建状态信息
state_info = {
"url": state.url,
"title": state.title,
"tabs": [tab.model_dump() for tab in state.tabs],
"interactive_elements": state.element_tree.clickable_elements_to_string(),
"scroll_info": {
"pixels_above": state.pixels_above,
"pixels_below": state.pixels_below,
},
}
return ToolResult(
output=json.dumps(state_info, indent=4, ensure_ascii=False),
base64_image=screenshot_b64, # 包含截图
)
except Exception as e:
return ToolResult(error=f"Failed to get state: {str(e)}")3. MCP 协议实现
3.1 MCP 概述
Model Context Protocol (MCP) 是一种让 AI Agent 连接外部服务的标准协议:
┌────────────────────────────────────────────────────────┐
│ OpenManus Agent │
│ ┌──────────────────────────────────────────────────┐ │
│ │ MCPClients │ │
│ │ │ │
│ │ Session 1 Session 2 Session 3 │ │
│ │ ↓ ↓ ↓ │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ SSE │ │STDIO│ │ SSE │ │ │
│ │ └──┬──┘ └──┬──┘ └──┬──┘ │ │
│ └──────┼───────────────┼───────────────┼──────────┘ │
└─────────┼───────────────┼───────────────┼──────────────┘
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 文件系统 │ │ 数据库 │ │ 第三方API│
│ MCP 服务 │ │ MCP 服务 │ │ MCP 服务 │
└──────────┘ └──────────┘ └──────────┘3.2 MCPClients 实现
python
# app/tool/mcp.py
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
class MCPClients(ToolCollection):
"""管理多个 MCP 服务器连接"""
sessions: Dict[str, ClientSession] = {}
exit_stacks: Dict[str, AsyncExitStack] = {}
async def connect_sse(self, server_url: str, server_id: str = "") -> None:
"""通过 SSE (Server-Sent Events) 连接"""
server_id = server_id or server_url
# 清理已有连接
if server_id in self.sessions:
await self.disconnect(server_id)
exit_stack = AsyncExitStack()
self.exit_stacks[server_id] = exit_stack
# 建立 SSE 连接
streams_context = sse_client(url=server_url)
streams = await exit_stack.enter_async_context(streams_context)
session = await exit_stack.enter_async_context(ClientSession(*streams))
self.sessions[server_id] = session
# 初始化并获取工具列表
await self._initialize_and_list_tools(server_id)
async def connect_stdio(self, command: str, args: List[str], server_id: str = "") -> None:
"""通过标准输入输出连接(启动子进程)"""
server_id = server_id or command
if server_id in self.sessions:
await self.disconnect(server_id)
exit_stack = AsyncExitStack()
self.exit_stacks[server_id] = exit_stack
# 启动子进程
server_params = StdioServerParameters(command=command, args=args)
stdio_transport = await exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = stdio_transport
session = await exit_stack.enter_async_context(ClientSession(read, write))
self.sessions[server_id] = session
await self._initialize_and_list_tools(server_id)3.3 工具发现与注册
python
async def _initialize_and_list_tools(self, server_id: str) -> None:
"""初始化会话并注册工具"""
session = self.sessions.get(server_id)
if not session:
raise RuntimeError(f"Session not found: {server_id}")
# 初始化 MCP 会话
await session.initialize()
# 获取服务器提供的工具列表
response = await session.list_tools()
# 为每个工具创建代理对象
for tool in response.tools:
original_name = tool.name
# 添加前缀避免命名冲突
tool_name = self._sanitize_tool_name(f"mcp_{server_id}_{original_name}")
# 创建工具代理
server_tool = MCPClientTool(
name=tool_name,
description=tool.description,
parameters=tool.inputSchema,
session=session,
server_id=server_id,
original_name=original_name,
)
self.tool_map[tool_name] = server_tool
self.tools = tuple(self.tool_map.values())
logger.info(f"Connected to {server_id} with tools: {[t.name for t in response.tools]}")3.4 MCPClientTool:工具代理
python
class MCPClientTool(BaseTool):
"""MCP 工具代理:将调用转发到远程服务"""
session: Optional[ClientSession] = None
server_id: str = ""
original_name: str = ""
async def execute(self, **kwargs) -> ToolResult:
"""执行工具(转发到 MCP 服务器)"""
if not self.session:
return ToolResult(error="Not connected to MCP server")
try:
# 调用远程工具
result = await self.session.call_tool(self.original_name, kwargs)
# 提取文本内容
content_str = ", ".join(
item.text for item in result.content
if isinstance(item, TextContent)
)
return ToolResult(output=content_str or "No output")
except Exception as e:
return ToolResult(error=f"MCP tool error: {str(e)}")4. 代码执行沙箱
4.1 多进程隔离
PythonExecute 使用多进程来隔离代码执行:
python
# app/tool/python_execute.py
import multiprocessing
from io import StringIO
import sys
class PythonExecute(BaseTool):
name: str = "python_execute"
description: str = "执行 Python 代码。注意:只有 print 输出可见"
def _run_code(self, code: str, result_dict: dict, safe_globals: dict) -> None:
"""在子进程中执行代码"""
original_stdout = sys.stdout
try:
# 重定向标准输出
output_buffer = StringIO()
sys.stdout = output_buffer
# 执行代码
exec(code, safe_globals, safe_globals)
result_dict["observation"] = output_buffer.getvalue()
result_dict["success"] = True
except Exception as e:
result_dict["observation"] = str(e)
result_dict["success"] = False
finally:
sys.stdout = original_stdout
async def execute(self, code: str, timeout: int = 5) -> Dict:
"""执行代码(带超时控制)"""
with multiprocessing.Manager() as manager:
# 使用 Manager 创建共享字典
result = manager.dict({"observation": "", "success": False})
# 准备安全的全局命名空间
if isinstance(__builtins__, dict):
safe_globals = {"__builtins__": __builtins__}
else:
safe_globals = {"__builtins__": __builtins__.__dict__.copy()}
# 创建子进程
proc = multiprocessing.Process(
target=self._run_code,
args=(code, result, safe_globals)
)
proc.start()
proc.join(timeout) # 等待执行,最多 timeout 秒
# 检查是否超时
if proc.is_alive():
proc.terminate()
proc.join(1)
return {
"observation": f"Execution timeout after {timeout} seconds",
"success": False,
}
return dict(result)执行隔离示意:
┌─────────────────────────────────────────────┐
│ 主进程 (OpenManus) │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ multiprocessing.Process │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ 子进程(沙箱) │ │ │
│ │ │ │ │ │
│ │ │ exec(code, safe_globals) │ │ │
│ │ │ │ │ │
│ │ │ stdout ──▶ StringIO │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────┘ │ │
│ │ ↓ │ │
│ │ Manager.dict (共享结果) │ │
│ └─────────────────────────────────────┘ │
│ ↓ │
│ 返回执行结果 │
└─────────────────────────────────────────────┘4.2 文件操作沙箱
StrReplaceEditor 支持本地和沙箱两种模式:
python
# app/tool/str_replace_editor.py
class StrReplaceEditor(BaseTool):
_local_operator: LocalFileOperator = LocalFileOperator()
_sandbox_operator: SandboxFileOperator = SandboxFileOperator()
def _get_operator(self) -> FileOperator:
"""根据配置选择操作器"""
return (
self._sandbox_operator
if config.sandbox.use_sandbox
else self._local_operator
)
async def execute(
self,
*,
command: Command, # view, create, str_replace, insert, undo_edit
path: str,
**kwargs,
) -> str:
operator = self._get_operator()
# 验证路径
await self.validate_path(command, Path(path), operator)
if command == "view":
return await self.view(path, kwargs.get("view_range"), operator)
elif command == "create":
await operator.write_file(path, kwargs["file_text"])
self._file_history[path].append(kwargs["file_text"])
return f"File created at: {path}"
elif command == "str_replace":
return await self.str_replace(
path, kwargs["old_str"], kwargs.get("new_str"), operator
)
# ... 更多命令5. PlanningFlow 实现
5.1 计划创建
python
# app/flow/planning.py
async def _create_initial_plan(self, request: str) -> None:
"""使用 LLM 创建初始计划"""
system_content = (
"你是一个规划助手。创建简洁、可执行的计划。"
"关注关键里程碑而非细节步骤。"
)
# 如果有多个 Agent,添加选择提示
if len(self.executor_keys) > 1:
agents_description = [
{"name": key.upper(), "description": self.agents[key].description}
for key in self.executor_keys if key in self.agents
]
system_content += f"\n可用 Agent: {json.dumps(agents_description)}"
system_content += "\n创建步骤时使用 '[AGENT_NAME]' 格式指定执行者。"
# 调用 LLM 创建计划
response = await self.llm.ask_tool(
messages=[Message.user_message(f"创建计划完成任务: {request}")],
system_msgs=[Message.system_message(system_content)],
tools=[self.planning_tool.to_param()],
tool_choice=ToolChoice.AUTO,
)
# 处理工具调用
if response.tool_calls:
for tool_call in response.tool_calls:
if tool_call.function.name == "planning":
args = json.loads(tool_call.function.arguments)
args["plan_id"] = self.active_plan_id
await self.planning_tool.execute(**args)
return
# 如果 LLM 没有使用规划工具,创建默认计划
await self.planning_tool.execute(
command="create",
plan_id=self.active_plan_id,
title=f"Plan for: {request[:50]}",
steps=["分析请求", "执行任务", "验证结果"],
)5.2 步骤执行
python
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
"""使用指定 Agent 执行当前步骤"""
# 获取计划文本
plan_status = await self._get_plan_text()
step_text = step_info.get("text", f"Step {self.current_step_index}")
# 构建步骤执行提示
step_prompt = f"""
当前计划状态:
{plan_status}
你的当前任务:
正在执行步骤 {self.current_step_index}: "{step_text}"
请只执行当前步骤,完成后提供摘要。
"""
try:
# 使用 Agent 执行步骤
step_result = await executor.run(step_prompt)
# 标记步骤完成
await self._mark_step_completed()
return step_result
except Exception as e:
logger.error(f"Step {self.current_step_index} failed: {e}")
return f"Error: {str(e)}"5.3 计划状态管理
python
class PlanStepStatus(str, Enum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
@classmethod
def get_status_marks(cls) -> Dict[str, str]:
"""状态标记符号"""
return {
cls.COMPLETED.value: "[✓]",
cls.IN_PROGRESS.value: "[→]",
cls.BLOCKED.value: "[!]",
cls.NOT_STARTED.value: "[ ]",
}
async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]:
"""获取当前待执行步骤"""
plan_data = self.planning_tool.plans.get(self.active_plan_id)
if not plan_data:
return None, None
steps = plan_data.get("steps", [])
step_statuses = plan_data.get("step_statuses", [])
# 找到第一个未完成的步骤
for i, step in enumerate(steps):
status = step_statuses[i] if i < len(step_statuses) else PlanStepStatus.NOT_STARTED.value
if status in PlanStepStatus.get_active_statuses():
step_info = {"text": step}
# 提取步骤类型(如 [MANUS]、[DATA_ANALYSIS])
import re
type_match = re.search(r"\[([A-Z_]+)\]", step)
if type_match:
step_info["type"] = type_match.group(1).lower()
# 标记为进行中
await self.planning_tool.execute(
command="mark_step",
plan_id=self.active_plan_id,
step_index=i,
step_status=PlanStepStatus.IN_PROGRESS.value,
)
return i, step_info
return None, None # 所有步骤已完成6. 防循环机制
6.1 检测重复响应
python
# app/agent/base.py
def is_stuck(self) -> bool:
"""检测 Agent 是否陷入循环"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# 统计相同内容出现的次数
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold # 默认阈值为 26.2 处理卡住状态
python
def handle_stuck_state(self):
"""处理卡住状态:添加策略调整提示"""
stuck_prompt = (
"检测到重复响应。请考虑新策略,避免重复已尝试过的无效路径。"
)
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent stuck detected. Added prompt: {stuck_prompt}")防循环流程:
┌─────────────────────────────────────────┐
│ 执行步骤 │
│ ↓ │
│ is_stuck() 检测 │
│ / \ │
│ 否 是 │
│ ↓ ↓ │
│ 继续执行 handle_stuck_state() │
│ ↓ │
│ 修改提示词 │
│ ↓ │
│ 下一步尝试新策略 │
└─────────────────────────────────────────┘7. 资源清理
7.1 Agent 清理
python
# app/agent/toolcall.py
async def cleanup(self):
"""清理 Agent 使用的资源"""
logger.info(f"🧹 Cleaning up resources for '{self.name}'...")
for tool_name, tool_instance in self.available_tools.tool_map.items():
# 检查工具是否有 cleanup 方法
if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(tool_instance.cleanup):
try:
await tool_instance.cleanup()
except Exception as e:
logger.error(f"Error cleaning up tool '{tool_name}': {e}")
logger.info(f"✨ Cleanup complete for '{self.name}'.")
async def run(self, request: Optional[str] = None) -> str:
"""运行 Agent,完成后自动清理"""
try:
return await super().run(request)
finally:
await self.cleanup()7.2 浏览器清理
python
# app/tool/browser_use_tool.py
async def cleanup(self):
"""清理浏览器资源"""
async with self.lock:
if self.context is not None:
await self.context.close()
self.context = None
self.dom_service = None
if self.browser is not None:
await self.browser.close()
self.browser = None
def __del__(self):
"""析构时确保清理"""
if self.browser is not None or self.context is not None:
try:
asyncio.run(self.cleanup())
except RuntimeError:
# 如果没有事件循环,创建新的
loop = asyncio.new_event_loop()
loop.run_until_complete(self.cleanup())
loop.close()8. 实现细节总结
| 功能 | 实现要点 | 关键技术 |
|---|---|---|
| LLM 调用 | 单例模式、Token 计数、重试机制 | tenacity, tiktoken |
| 浏览器自动化 | 懒加载、并发锁、状态截图 | browser-use, Playwright |
| MCP 连接 | SSE/STDIO 双模式、工具代理 | mcp SDK, AsyncExitStack |
| 代码执行 | 多进程隔离、超时控制、输出捕获 | multiprocessing |
| 任务规划 | 步骤分解、状态跟踪、多 Agent 调度 | PlanningTool, 状态机 |
| 防循环 | 重复检测、策略调整提示 | 内容哈希比较 |
| 资源管理 | 显式清理、析构保护 | async cleanup, del |
下一章:22.5 使用指南 - 安装配置与实践操作